Skip to content

Conversation

@mathantunes
Copy link
Contributor

@mathantunes mathantunes commented Nov 11, 2025

User description

This PR introduces the a concept of initial checkpoint position for new subscriptions.

As discussed on #457 and #456.

The new option is added to the SubscriptionWithCheckpointOptions type and managed directly on the children of EventSubscriptionWithCheckpoint.

The implementation so far covers the SQL event stores and EventStoreDB. Redis is NOT yet covered.

If this approach is deemed acceptable:

  • Write tests
  • Document the new option

Rework of #457


PR Type

Enhancement


Description

  • Add CheckpointInitialPosition enum to control subscription start position

  • Allow subscriptions to start from end of stream for integration purposes

  • Implement initial position logic for KurrentDB and SQL event stores

  • Add comprehensive tests for starting consumption from end position


Diagram Walkthrough

flowchart LR
  A["CheckpointInitialPosition<br/>enum"] -->|"added to"| B["SubscriptionWithCheckpointOptions"]
  B -->|"used by"| C["EventSubscriptionWithCheckpoint"]
  C -->|"implemented in"| D["KurrentDB Subscriptions"]
  C -->|"implemented in"| E["SQL Subscriptions"]
  D -->|"tested by"| F["KurrentDB Tests"]
  E -->|"tested by"| G["SQL Tests"]
Loading

File Walkthrough

Relevant files
Enhancement
6 files
CheckpointInitialPosition.cs
New enum for subscription start position                                 
+6/-0     
SubscriptionOptions.cs
Add InitialPosition property to checkpoint options             
+1/-0     
EventSubscriptionWithCheckpoint.cs
Expose CheckpointStore as protected property                         
+2/-1     
AllStreamSubscription.cs
Implement initial position logic for all stream                   
+7/-1     
StreamSubscription.cs
Implement initial position logic for stream                           
+7/-1     
SqlSubscriptionBase.cs
Implement initial position logic for SQL subscriptions     
+6/-0     
Tests
3 files
SubscribeToAll.cs
Add test helper for end position consumption                         
+18/-0   
SubscribeTests.cs
Add KurrentDB tests for end position subscription               
+12/-1   
SubscribeTests.cs
Add PostgreSQL tests for end position subscription             
+11/-0   

@mathantunes mathantunes force-pushed the feat/subscription-initial-position branch 3 times, most recently from b59f744 to cd7fc2e Compare November 11, 2025 01:51
@mathantunes mathantunes force-pushed the feat/subscription-initial-position branch from f82c525 to fe01b3d Compare November 17, 2025 21:26
@mathantunes mathantunes marked this pull request as ready for review November 17, 2025 21:26
@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Nov 17, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🟡
🎫 #456
🟢 Allow subscriptions to start from the current position ("now") instead of from the
beginning of the stream
Enable configuration of the initial subscription position for integration purposes
Provide a way to specify the starting point when registering a new subscription
Prevent processing of historical events when subscribing for integration scenarios
Codebase Duplication Compliance
🟢
No codebase code duplication found New Components Detected:
- SubscribeToAllFromEnd
- SubscribeToAllFromEnd
Custom Compliance
🟢
Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Nov 17, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle invalid end-of-stream position correctly
Suggestion Impact:The suggestion was directly implemented in the commit. The code adds the EndOfStream.Invalid check with exception throwing, adds .NoContext() to async calls, and improves the error message to include "subscription" in the text. The commit matches the suggestion's intent and implementation almost exactly.

code diff:

-            var endOfStream = await GetSubscriptionEndOfStream(cancellationToken);
+            var endOfStream = await GetSubscriptionEndOfStream(cancellationToken).NoContext();
             if (endOfStream == EndOfStream.Invalid) {
-                throw new InvalidOperationException($"Could not get the end of the stream for {SubscriptionId}");
+                throw new InvalidOperationException($"Could not get the end of the stream for subscription {SubscriptionId}");
+            }
+            await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken).NoContext();
+            position = endOfStream.Position;

Add a check for an invalid EndOfStream from GetSubscriptionEndOfStream. If it's
invalid, throw an exception to prevent the subscription from starting at an
incorrect position when CheckpointInitialPosition.End is used.

src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs [217-221]

 if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) {
-    var endOfStream = await GetSubscriptionEndOfStream(cancellationToken);
-    await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken);
+    var endOfStream = await GetSubscriptionEndOfStream(cancellationToken).NoContext();
+    if (endOfStream == EndOfStream.Invalid) {
+        throw new InvalidOperationException($"Could not get the end of the stream for subscription {SubscriptionId}");
+    }
+    await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken).NoContext();
     position = endOfStream.Position;
 }

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a bug in the error handling path where a failure to get the end of the stream would cause the subscription to incorrectly start from the beginning instead of the end.

Medium
General
Remove redundant test assertion

Remove the redundant assertion Assert.That(fixture.Handler.Count).IsEqualTo(10)
as the event count is already validated by AssertCollection.

src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs [31-33]

 await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [.. testEvents]).Validate(cancellationToken);
 await fixture.StopSubscription();
-await Assert.That(fixture.Handler.Count).IsEqualTo(10);

[Suggestion processed]

Suggestion importance[1-10]: 2

__

Why: The suggestion correctly points out a redundant assertion in a test method, but removing it is a minor code cleanup with no impact on functionality or correctness.

Low
  • Update

@mathantunes mathantunes force-pushed the feat/subscription-initial-position branch from fe01b3d to 7e389db Compare November 17, 2025 21:34
@github-actions
Copy link

Test Results

 51 files  + 34   51 suites  +34   33m 36s ⏱️ + 21m 34s
281 tests + 12  281 ✅ + 12  0 💤 ±0  0 ❌ ±0 
846 runs  +566  846 ✅ +566  0 💤 ±0  0 ❌ ±0 

Results for commit 7e389db. ± Comparison against base commit 20e513c.

This pull request removes 5 and adds 17 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/15/2025 4:21:41 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/15/2025 4:21:41 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(9f9500c2-a728-4355-9693-4b0bf3e9ba06)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-15T16:21:42.3737910+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2025-11-15T16:21:42.3737910+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-15T16:21:42.3737910+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(091bd3b8-d18b-4665-825c-70d5a1645d1d)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(0c5c866c-25ba-47e2-a95d-48e3ba7759fe)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/20/2025 1:19:20 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/20/2025 1:19:20 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/20/2025 1:19:21 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/20/2025 1:19:21 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/20/2025 1:19:28 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/20/2025 1:19:28 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(39ac54f2-7644-4033-9cc1-e2ac27b25183)
Eventuous.Tests.KurrentDB.Subscriptions.SubscribeToAllFromEnd ‑ Esdb_ShouldStartConsumptionFromEnd
…

alexeyzimarev and others added 2 commits November 24, 2025 12:53
Co-authored-by: qodo-merge-for-open-source[bot] <189517486+qodo-merge-for-open-source[bot]@users.noreply.github.com>
@alexeyzimarev alexeyzimarev merged commit 3ee475a into Eventuous:dev Nov 24, 2025
@alexeyzimarev
Copy link
Contributor

/add_docs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants